- ML Pipeline Automation
- ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability.
- Pipeline Components
- Data Ingestion
-
- Collecting data from multiple sources
- Data Processing
-
- Cleaning, transformation, feature engineering
- Model Training
-
- Training and hyperparameter tuning
- Validation
-
- Cross-validation and testing
- Deployment
-
- Moving models to production
- Monitoring
-
- Tracking performance metrics
- Orchestration Platforms
- Apache Airflow
-
- Workflow scheduling with DAGs
- Kubeflow
-
- Kubernetes-native ML workflows
- Jenkins
-
- CI/CD for ML pipelines
- Prefect
-
- Modern data flow orchestration
- Dagster
- Asset-driven orchestration Python Implementation import pandas as pd import numpy as np from sklearn . datasets import make_classification from sklearn . model_selection import train_test_split from sklearn . preprocessing import StandardScaler from sklearn . ensemble import RandomForestClassifier from sklearn . metrics import accuracy_score , f1_score import joblib import logging from datetime import datetime import json import os
Airflow imports
from airflow import DAG from airflow . operators . python import PythonOperator from airflow . operators . bash import BashOperator from airflow . utils . dates import days_ago
MLflow for tracking
import mlflow import mlflow . sklearn
Logging setup
logging . basicConfig ( level = logging . INFO ) logger = logging . getLogger ( name ) print ( "=== 1. Modular Pipeline Functions ===" )
Data ingestion
def ingest_data ( ** context ) : """Ingest and load data""" logger . info ( "Starting data ingestion..." ) X , y = make_classification ( n_samples = 2000 , n_features = 30 , n_informative = 20 , random_state = 42 ) data = pd . DataFrame ( X , columns = [ f'feature_ { i } ' for i in range ( X . shape [ 1 ] ) ] ) data [ 'target' ] = y
Save to disk
data_path
'/tmp/raw_data.csv' data . to_csv ( data_path , index = False ) context [ 'task_instance' ] . xcom_push ( key = 'data_path' , value = data_path ) logger . info ( f"Data ingested: { len ( data ) } rows" ) return { 'status' : 'success' , 'samples' : len ( data ) }
Data processing
def process_data ( ** context ) : """Clean and preprocess data""" logger . info ( "Starting data processing..." )
Get data path from previous task
task_instance
context [ 'task_instance' ] data_path = task_instance . xcom_pull ( key = 'data_path' , task_ids = 'ingest_data' ) data = pd . read_csv ( data_path )
Handle missing values
data
data . fillna ( data . mean ( ) )
Remove duplicates
data
data . drop_duplicates ( )
Remove outliers (simple approach)
numeric_cols
data . select_dtypes ( include = [ np . number ] ) . columns for col in numeric_cols : Q1 = data [ col ] . quantile ( 0.25 ) Q3 = data [ col ] . quantile ( 0.75 ) IQR = Q3 - Q1 data = data [ ( data [ col ]
= Q1 - 1.5 * IQR ) & ( data [ col ] <= Q3 + 1.5 * IQR ) ] processed_path = '/tmp/processed_data.csv' data . to_csv ( processed_path , index = False ) task_instance . xcom_push ( key = 'processed_path' , value = processed_path ) logger . info ( f"Data processed: { len ( data ) } rows after cleaning" ) return { 'status' : 'success' , 'rows_remaining' : len ( data ) }
Feature engineering
def engineer_features ( ** context ) : """Create new features""" logger . info ( "Starting feature engineering..." ) task_instance = context [ 'task_instance' ] processed_path = task_instance . xcom_pull ( key = 'processed_path' , task_ids = 'process_data' ) data = pd . read_csv ( processed_path )
Create interaction features
feature_cols
[ col for col in data . columns if col . startswith ( 'feature_' ) ] for i in range ( min ( 5 , len ( feature_cols ) ) ) : for j in range ( i + 1 , min ( 6 , len ( feature_cols ) ) ) : data [ f'interaction_ { i } _ { j } ' ] = data [ feature_cols [ i ] ] * data [ feature_cols [ j ] ]
Create polynomial features
for col in feature_cols [ : 5 ] : data [ f' { col } _squared' ] = data [ col ] ** 2 engineered_path = '/tmp/engineered_data.csv' data . to_csv ( engineered_path , index = False ) task_instance . xcom_push ( key = 'engineered_path' , value = engineered_path ) logger . info ( f"Features engineered: { len ( data . columns ) } total features" ) return { 'status' : 'success' , 'features' : len ( data . columns ) }
Train model
def train_model ( ** context ) : """Train ML model""" logger . info ( "Starting model training..." ) task_instance = context [ 'task_instance' ] engineered_path = task_instance . xcom_pull ( key = 'engineered_path' , task_ids = 'engineer_features' ) data = pd . read_csv ( engineered_path ) X = data . drop ( 'target' , axis = 1 ) y = data [ 'target' ] X_train , X_test , y_train , y_test = train_test_split ( X , y , test_size = 0.2 , random_state = 42 )
Scale features
scaler
StandardScaler ( ) X_train_scaled = scaler . fit_transform ( X_train ) X_test_scaled = scaler . transform ( X_test )
Train model
model
RandomForestClassifier ( n_estimators = 100 , max_depth = 15 , random_state = 42 ) model . fit ( X_train_scaled , y_train )
Evaluate
y_pred
model . predict ( X_test_scaled ) accuracy = accuracy_score ( y_test , y_pred ) f1 = f1_score ( y_test , y_pred )
Save model
model_path
'/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib . dump ( model , model_path ) joblib . dump ( scaler , scaler_path ) task_instance . xcom_push ( key = 'model_path' , value = model_path ) task_instance . xcom_push ( key = 'scaler_path' , value = scaler_path )
Log to MLflow
with mlflow . start_run ( ) : mlflow . log_param ( 'n_estimators' , 100 ) mlflow . log_param ( 'max_depth' , 15 ) mlflow . log_metric ( 'accuracy' , accuracy ) mlflow . log_metric ( 'f1_score' , f1 ) mlflow . sklearn . log_model ( model , 'model' ) logger . info ( f"Model trained: Accuracy= { accuracy : .4f } , F1= { f1 : .4f } " ) return { 'status' : 'success' , 'accuracy' : accuracy , 'f1_score' : f1 }
Validate model
def validate_model ( ** context ) : """Validate model performance""" logger . info ( "Starting model validation..." ) task_instance = context [ 'task_instance' ] model_path = task_instance . xcom_pull ( key = 'model_path' , task_ids = 'train_model' ) engineered_path = task_instance . xcom_pull ( key = 'engineered_path' , task_ids = 'engineer_features' ) model = joblib . load ( model_path ) data = pd . read_csv ( engineered_path ) X = data . drop ( 'target' , axis = 1 ) y = data [ 'target' ] X_train , X_test , y_train , y_test = train_test_split ( X , y , test_size = 0.2 , random_state = 42 ) scaler_path = task_instance . xcom_pull ( key = 'scaler_path' , task_ids = 'train_model' ) scaler = joblib . load ( scaler_path ) X_test_scaled = scaler . transform ( X_test )
Validate
y_pred
model . predict ( X_test_scaled ) accuracy = accuracy_score ( y_test , y_pred ) validation_result = { 'status' : 'success' if accuracy
0.85 else 'failed' , 'accuracy' : accuracy , 'threshold' : 0.85 , 'timestamp' : datetime . now ( ) . isoformat ( ) } task_instance . xcom_push ( key = 'validation_result' , value = json . dumps ( validation_result ) ) logger . info ( f"Validation result: { validation_result } " ) return validation_result
Deploy model
def deploy_model ( ** context ) : """Deploy validated model""" logger . info ( "Starting model deployment..." ) task_instance = context [ 'task_instance' ] validation_result = json . loads ( task_instance . xcom_pull ( key = 'validation_result' , task_ids = 'validate_model' ) ) if validation_result [ 'status' ] != 'success' : logger . warning ( "Validation failed, deployment skipped" ) return { 'status' : 'skipped' , 'reason' : 'validation_failed' } model_path = task_instance . xcom_pull ( key = 'model_path' , task_ids = 'train_model' ) scaler_path = task_instance . xcom_pull ( key = 'scaler_path' , task_ids = 'train_model' )
Simulate deployment
deploy_path
'/tmp/deployed_model/' os . makedirs ( deploy_path , exist_ok = True ) import shutil shutil . copy ( model_path , os . path . join ( deploy_path , 'model.pkl' ) ) shutil . copy ( scaler_path , os . path . join ( deploy_path , 'scaler.pkl' ) ) logger . info ( f"Model deployed to { deploy_path } " ) return { 'status' : 'success' , 'deploy_path' : deploy_path }
2. Airflow DAG Definition
print ( "\n=== 2. Airflow DAG ===" ) dag_definition = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'ml-team', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'ml_pipeline_dag', default_args=default_args, description='End-to-end ML pipeline', schedule_interval='0 2 * * *', # Daily at 2 AM start_date=datetime(2024, 1, 1), catchup=False, ) as dag:
Task 1: Ingest Data
ingest = PythonOperator( task_id='ingest_data', python_callable=ingest_data, )
Task 2: Process Data
process = PythonOperator( task_id='process_data', python_callable=process_data, )
Task 3: Engineer Features
engineer = PythonOperator( task_id='engineer_features', python_callable=engineer_features, )
Task 4: Train Model
train = PythonOperator( task_id='train_model', python_callable=train_model, )
Task 5: Validate Model
validate = PythonOperator( task_id='validate_model', python_callable=validate_model, )
Task 6: Deploy Model
deploy = PythonOperator( task_id='deploy_model', python_callable=deploy_model, )
Define dependencies
ingest >> process >> engineer >> train >> validate >> deploy ''' print ( "Airflow DAG defined with 6 tasks" )
3. Pipeline execution summary
print ( "\n=== 3. Pipeline Execution ===" ) class PipelineOrchestrator : def init ( self ) : self . execution_log = [ ] self . start_time = None self . end_time = None def run_pipeline ( self ) : self . start_time = datetime . now ( ) logger . info ( "Starting ML pipeline execution" ) try :
Execute pipeline tasks
result1
ingest_data ( task_instance = self ) self . execution_log . append ( ( 'ingest_data' , result1 ) ) result2 = process_data ( task_instance = self ) self . execution_log . append ( ( 'process_data' , result2 ) ) result3 = engineer_features ( task_instance = self ) self . execution_log . append ( ( 'engineer_features' , result3 ) ) result4 = train_model ( task_instance = self ) self . execution_log . append ( ( 'train_model' , result4 ) ) result5 = validate_model ( task_instance = self ) self . execution_log . append ( ( 'validate_model' , result5 ) ) result6 = deploy_model ( task_instance = self ) self . execution_log . append ( ( 'deploy_model' , result6 ) ) self . end_time = datetime . now ( ) logger . info ( "Pipeline execution completed successfully" ) except Exception as e : logger . error ( f"Pipeline execution failed: { str ( e ) } " ) def xcom_push ( self , key , value ) : if not hasattr ( self , 'xcom_storage' ) : self . xcom_storage = { } self . xcom_storage [ key ] = value def xcom_pull ( self , key , task_ids ) : if hasattr ( self , 'xcom_storage' ) and key in self . xcom_storage : return self . xcom_storage [ key ] return None def get_summary ( self ) : duration = ( self . end_time - self . start_time ) . total_seconds ( ) if self . end_time else 0 return { 'start_time' : self . start_time . isoformat ( ) if self . start_time else None , 'end_time' : self . end_time . isoformat ( ) if self . end_time else None , 'duration_seconds' : duration , 'tasks_executed' : len ( self . execution_log ) , 'execution_log' : self . execution_log }
Execute pipeline
orchestrator
- PipelineOrchestrator
- (
- )
- orchestrator
- .
- run_pipeline
- (
- )
- summary
- =
- orchestrator
- .
- get_summary
- (
- )
- (
- "\n=== Pipeline Summary ==="
- )
- for
- key
- ,
- value
- in
- summary
- .
- items
- (
- )
- :
- if
- key
- !=
- 'execution_log'
- :
- (
- f"
- {
- key
- }
- :
- {
- value
- }
- "
- )
- (
- "\nTask Execution Log:"
- )
- for
- task_name
- ,
- result
- in
- summary
- [
- 'execution_log'
- ]
- :
- (
- f"
- {
- task_name
- }
- :
- {
- result
- }
- "
- )
- (
- "\nML pipeline automation setup completed!"
- )
- Pipeline Best Practices
- Modularity
-
- Each step should be independent
- Idempotency
-
- Tasks should be safely repeatable
- Error Handling
-
- Graceful degradation and alerting
- Versioning
-
- Track data, code, and model versions
- Monitoring
-
- Track execution metrics and logs
- Scheduling Strategies
- Daily
-
- Standard for daily retraining
- Weekly
-
- For larger feature engineering
- On-demand
-
- Triggered by data updates
- Real-time
- For streaming applications Deliverables Automated pipeline DAG Task dependency graph Execution logs and monitoring Performance metrics Rollback procedures Documentation